AWS Step FunctionsでAthenaクエリ実行の開始および結果取得をしてみた(AWS CDK v2)
こんにちは、CX事業本部 IoT事業部の若槻です。
Amazon Athenaを使用すると、SQLライクなクエリ文字列でS3 Bucketなどに格納されているデータを直接分析することが可能です。
今回は、AWS Step FunctionsからAthenaクエリ実行の開始および結果取得をしてみました。
やってみた
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
import { Construct } from 'constructs'; import { aws_s3, aws_stepfunctions, aws_stepfunctions_tasks, aws_athena, RemovalPolicy, Duration, Stack, StackProps, } from 'aws-cdk-lib'; export class ProcessStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // Athenaクエリ結果格納バケット const athenaQueryResultBucket = new aws_s3.Bucket( this, 'athenaQueryResultBucket', { bucketName: `athena-query-result-${this.account}`, removalPolicy: RemovalPolicy.DESTROY, }, ); // Athenaワークグループ const athenaWorkGroup = new aws_athena.CfnWorkGroup( this, 'athenaWorkGroup', { name: 'athenaWorkGroup', workGroupConfiguration: { resultConfiguration: { outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`, }, }, }, ); // Athenaクエリ実行開始 const startAthenaQueryExecutionTask = new aws_stepfunctions_tasks.AthenaStartQueryExecution( this, 'startAthenaQueryExecutionTask', { queryString: aws_stepfunctions.JsonPath.stringAt( '$.executionInput.queryString', ), workGroup: athenaWorkGroup.name, resultPath: '$.startAthenaQueryExecutionTaskOutPut', }, ); // Athenaクエリ実行取得 const getAthenaQueryExecutionTask = new aws_stepfunctions_tasks.AthenaGetQueryExecution( this, 'getAthenaQueryExecutionTask', { queryExecutionId: aws_stepfunctions.JsonPath.stringAt( '$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId', ), resultSelector: { queryState: aws_stepfunctions.JsonPath.stringAt( '$.QueryExecution.Status.State', ), }, resultPath: '$.getAthenaQueryExecutionTaskOutPut', }, ); // 10秒待機 const wait10seconds = new aws_stepfunctions.Wait(this, 'wait10seconds', { time: aws_stepfunctions.WaitTime.duration(Duration.seconds(10)), }); // Athenaクエリ結果取得 const getAthenaQueryResultsTask = new aws_stepfunctions_tasks.AthenaGetQueryResults( this, 'getAthenaQueryResultsTask', { queryExecutionId: aws_stepfunctions.JsonPath.stringAt( '$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId', ), }, ); // Athenaクエリが完了するまで待機 const athenaQueryStateChoiceTask = new aws_stepfunctions.Choice( this, 'athenaQueryStateChoiceTask', ); athenaQueryStateChoiceTask.when( aws_stepfunctions.Condition.or( aws_stepfunctions.Condition.stringEquals( aws_stepfunctions.JsonPath.stringAt( '$.getAthenaQueryExecutionTaskOutPut.queryState', ), 'QUEUED', ), aws_stepfunctions.Condition.stringEquals( aws_stepfunctions.JsonPath.stringAt( '$.getAthenaQueryExecutionTaskOutPut.queryState', ), 'RUNNING', ), ), wait10seconds, ); athenaQueryStateChoiceTask.otherwise(getAthenaQueryResultsTask); // State Machine new aws_stepfunctions.StateMachine(this, 'stateMachine', { stateMachineName: 'stateMachine', definition: startAthenaQueryExecutionTask .next(wait10seconds) .next(getAthenaQueryExecutionTask) .next(athenaQueryStateChoiceTask), }); } }
- Athenaクエリの実行は非同期なので実行開始後に10秒待機し、実行が完了するまで10秒待機および実行結果の確認を繰り返します。
上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。
{ "StartAt": "startAthenaQueryExecutionTask", "States": { "startAthenaQueryExecutionTask": { "Next": "wait10seconds", "Type": "Task", "ResultPath": "$.startAthenaQueryExecutionTaskOutPut", "Resource": "arn:aws:states:::athena:startQueryExecution", "Parameters": { "QueryString.$": "$.executionInput.queryString", "ResultConfiguration": {}, "WorkGroup": "athenaWorkGroup" } }, "wait10seconds": { "Type": "Wait", "Seconds": 10, "Next": "getAthenaQueryExecutionTask" }, "athenaQueryStateChoiceTask": { "Type": "Choice", "Choices": [ { "Or": [ { "Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState", "StringEquals": "QUEUED" }, { "Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState", "StringEquals": "RUNNING" } ], "Next": "wait10seconds" } ], "Default": "getAthenaQueryResultsTask" }, "getAthenaQueryExecutionTask": { "Next": "athenaQueryStateChoiceTask", "Type": "Task", "ResultPath": "$.getAthenaQueryExecutionTaskOutPut", "ResultSelector": { "queryState.$": "$.QueryExecution.Status.State" }, "Resource": "arn:aws:states:::athena:getQueryExecution", "Parameters": { "QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId" } }, "getAthenaQueryResultsTask": { "End": true, "Type": "Task", "Resource": "arn:aws:states:::athena:getQueryResults", "Parameters": { "QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId" } } } }
動作確認
次のクエリで確認してみます。
SELECT ARRAY [1,2,3,4] AS items
クエリをコンソールで実行した様子です。結果でレコードが1つ取得できています。
次の入力を指定してステートマシンを実行します。
{ "executionInput": { "queryString": "SELECT ARRAY [1,2,3,4] AS items" } }
すると実行が成功しました。
getAthenaQueryResultsTask
のOutputを見ると、クエリ結果が取得できています!
{ "ResultSet": { "ResultSetMetadata": { "ColumnInfo": [ { "CaseSensitive": false, "CatalogName": "hive", "Label": "items", "Name": "items", "Nullable": "UNKNOWN", "Precision": 0, "Scale": 0, "SchemaName": "", "TableName": "", "Type": "array" } ] }, "Rows": [ { "Data": [ { "VarCharValue": "items" } ] }, { "Data": [ { "VarCharValue": "[1, 2, 3, 4]" } ] } ] }, "UpdateCount": 0 }
おわりに
AWS Step FunctionsでAthenaクエリ実行の開始および結果取得をしてみました。難なく実装ができたので流石はStep Functionsと言ったところです。
ただしathena:getQueryResults APIにより取得した実行結果のJson Objectはそのままでは扱いにくい形式なので、取得後にEvaluateExpressionタスクなどで上手いこと変換したいですね。
参考
- Querying arrays - Amazon Athena
- athena-Type-QueryExecutionStatus-State - QueryExecutionStatus - Athena
- athena-Type-QueryExecutionStatus-State - QueryExecutionStatus - Athena
- class AthenaGetQueryExecution (construct) · AWS CDK
以上